{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<a href=\"https://colab.research.google.com/github/run-llama/llama_index/blob/main/docs/examples/retrievers/pathway_retriever.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Pathway Retriever"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "> [Pathway](https://pathway.com/) is an open data processing framework. It allows you to easily develop data transformation pipelines and Machine Learning applications that work with live data sources and changing data.\n",
    "\n",
    "This notebook demonstrates how to set up a live data indexing pipeline. You can query the results of this pipeline from your LLM application using the provided `PathwayRetriever`. However, under the hood, Pathway updates the index on each data change giving you always up-to-date answers.\n",
    "\n",
    "In this notebook, we will use a simple document processing pipeline that:\n",
    "\n",
    "1. Monitors several data sources (files, S3 folders, cloud storage) for data changes.\n",
    "2. Parses, splits and embeds the documents using Llama-index methods.\n",
    "3. Builds a vector index for the data.\n",
    "\n",
    "We will connect to the index using `llama_index.retrievers.PathwayRetriever` retriever, which implements the `retrieve` interface.\n",
    "\n",
    "The basic pipeline described in this document allows to effortlessly build a simple index of files stored in a cloud location. However, Pathway provides everything needed to build realtime data pipelines and apps, including SQL-like able operations such as groupby-reductions and joins between disparate data sources, time-based grouping and windowing of data, and a wide array of connectors. \n",
    "\n",
    "For more details about Pathway data ingestion pipeline and vector store, visit [vector store pipeline](https://pathway.com/developers/showcases/vectorstore_pipeline)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Prerequisites\n",
    "\n",
    "Install `pathway` and `llama-index` packages. Then download sample data."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!pip install pathway\n",
    "!pip install llama-index\n",
    "\n",
    "!mkdir -p 'data/'\n",
    "!wget 'https://gist.githubusercontent.com/janchorowski/dd22a293f3d99d1b726eedc7d46d2fc0/raw/pathway_readme.md' -O 'data/pathway_readme.md'"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Set up your OpenAI API key."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "OpenAI API Key: ········\n"
     ]
    }
   ],
   "source": [
    "import getpass\n",
    "import os\n",
    "\n",
    "# omit if embedder of choice is not OpenAI\n",
    "if \"OPENAI_API_KEY\" not in os.environ:\n",
    "    os.environ[\"OPENAI_API_KEY\"] = getpass.getpass(\"OpenAI API Key:\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Configure logging"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import logging\n",
    "import sys\n",
    "\n",
    "logging.basicConfig(stream=sys.stdout, level=logging.ERROR)\n",
    "logging.getLogger().addHandler(logging.StreamHandler(stream=sys.stdout))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Define data sources tracked by Pathway"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Pathway can listen to many sources simultaneously, such as local files, S3 folders, cloud storage and any data stream for data changes.\n",
    "\n",
    "See [pathway-io](https://pathway.com/developers/api-docs/pathway-io) for more information."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pathway as pw\n",
    "\n",
    "data_sources = []\n",
    "data_sources.append(\n",
    "    pw.io.fs.read(\n",
    "        \"./data\",\n",
    "        format=\"binary\",\n",
    "        mode=\"streaming\",\n",
    "        with_metadata=True,\n",
    "    )  # This creates a `pathway` connector that tracks\n",
    "    # all the files in the ./data directory\n",
    ")\n",
    "\n",
    "# This creates a connector that tracks files in Google drive.\n",
    "# please follow the instructions at https://pathway.com/developers/tutorials/connectors/gdrive-connector/ to get credentials\n",
    "# data_sources.append(\n",
    "#     pw.io.gdrive.read(object_id=\"17H4YpBOAKQzEJ93xmC2z170l0bP2npMy\", service_user_credentials_file=\"credentials.json\", with_metadata=True))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create the document indexing pipeline"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let us create the document indexing pipeline. The `transformations` should be a list of `TransformComponent`s ending with an `Embedding` transformation.\n",
    "\n",
    "In this example, let's first split the text first using `TokenTextSplitter`, then embed with `OpenAIEmbedding`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<Thread(Thread-5 (run), started 140158811412032)>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "======== Running on http://127.0.0.1:8754 ========\n",
      "(Press CTRL+C to quit)\n",
      "148\n"
     ]
    }
   ],
   "source": [
    "from llama_index.retrievers import PathwayVectorServer\n",
    "from llama_index.embeddings import OpenAIEmbedding\n",
    "from llama_index.node_parser import TokenTextSplitter\n",
    "\n",
    "embed_model = OpenAIEmbedding(embed_batch_size=10)\n",
    "\n",
    "transformations_example = [\n",
    "    TokenTextSplitter(\n",
    "        chunk_size=150,\n",
    "        chunk_overlap=10,\n",
    "        separator=\" \",\n",
    "    ),\n",
    "    embed_model,\n",
    "]\n",
    "\n",
    "processing_pipeline = PathwayVectorServer(\n",
    "    *data_sources,\n",
    "    transformations=transformations_example,\n",
    ")\n",
    "\n",
    "# Define the Host and port that Pathway will be on\n",
    "PATHWAY_HOST = \"127.0.0.1\"\n",
    "PATHWAY_PORT = 8754\n",
    "\n",
    "# `threaded` runs pathway in detached mode, we have to set it to False when running from terminal or container\n",
    "# for more information on `with_cache` check out https://pathway.com/developers/api-docs/persistence-api\n",
    "processing_pipeline.run_server(\n",
    "    host=PATHWAY_HOST, port=PATHWAY_PORT, with_cache=False, threaded=True\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create Retriever for llama-index"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[NodeWithScore(node=TextNode(id_='7f507732-bf58-4a45-948c-dc6bb762812e', embedding=None, metadata={'created_at': None, 'modified_at': 1703700445, 'owner': 'janek', 'path': '/home/janek/projects/llama_index/docs/examples/retrievers/data/pathway_readme.md'}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={}, hash='28f5c0d0e8cae502a0c4d7a72947449c6297f56196fa5c830ebb889026e4ccb9', text=\"If you want to do streaming in Python, build an AI data pipeline, or if you are looking for your next Python data processing framework, keep reading.\\n\\nPathway provides a high-level programming interface in Python for defining data transformations, aggregations, and other operations on data streams.\\nWith Pathway, you can effortlessly design and deploy sophisticated data workflows that efficiently handle high volumes of data in real time.\\n\\nPathway is interoperable with various data sources and sinks such as Kafka, CSV files, SQL/noSQL databases, and REST API's, allowing you to connect and process data from different storage systems.\\n\\nTypical use-cases of Pathway include realtime data processing, ETL (Extract, Transform, Load) pipelines, data analytics,\", start_char_idx=None, end_char_idx=None, text_template='{metadata_str}\\n\\n{content}', metadata_template='{key}: {value}', metadata_seperator='\\n'), score=0.16584277747494724),\n",
       " NodeWithScore(node=TextNode(id_='274b0481-dedd-41a8-8d94-d1319cd4bc2f', embedding=None, metadata={'created_at': None, 'modified_at': 1703700445, 'owner': 'janek', 'path': '/home/janek/projects/llama_index/docs/examples/retrievers/data/pathway_readme.md'}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={}, hash='1c57f8f0d152933320cca745a42239f5ef7c70966c1e5b52fc83dfd515243012', text=\"Pathway is an open framework for high-throughput and low-latency real-time data processing. It is used to create Python code which seamlessly combines batch processing, streaming, and real-time API's for LLM apps. Pathway's distributed runtime (🦀-🐍) provides fresh results of your data pipelines whenever new inputs and requests are received.\\n\\nIn the first place, Pathway was designed to be a life-saver (or at least a time-saver) for Python developers and ML/AI engineers faced with live data sources, where you need to react quickly to fresh data. Still, Pathway is a powerful tool that can be used for a lot of things. If you want to do streaming in Python,\", start_char_idx=None, end_char_idx=None, text_template='{metadata_str}\\n\\n{content}', metadata_template='{key}: {value}', metadata_seperator='\\n'), score=0.1441072441651936),\n",
       " NodeWithScore(node=TextNode(id_='be82f00d-553a-47df-b639-5fc15ee7bdc8', embedding=None, metadata={'created_at': None, 'modified_at': 1703700445, 'owner': 'janek', 'path': '/home/janek/projects/llama_index/docs/examples/retrievers/data/pathway_readme.md'}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={}, hash='625d489d1a465cbfcb4f50fb84a5e29ca07cffeb3c5ef900e530822e9ff8cf7f', text=\"Get Help\\n\\nIf you have any questions, issues, or just want to chat about Pathway, we're here to help! Feel free to:\\n- Check out the documentation in https://pathway.com/developers/ for detailed information.\\n- Reach out to us via email at contact@pathway.com.\\n\\nOur team is always happy to help you and ensure that you get the most out of Pathway.\\nIf you would like to better understand how best to use Pathway in your project, please don't hesitate to reach out to us.\", start_char_idx=None, end_char_idx=None, text_template='{metadata_str}\\n\\n{content}', metadata_template='{key}: {value}', metadata_seperator='\\n'), score=0.186929683249915)]"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from llama_index.retrievers import PathwayRetriever\n",
    "\n",
    "retriever = PathwayRetriever(host=PATHWAY_HOST, port=PATHWAY_PORT)\n",
    "retriever.retrieve(str_or_query_bundle=\"what is pathway\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Your turn!** Now edit the contents of the source file, or upload a new file to the `./data` directory and repeat the query - the set of retrieved documents will reflect the changes!"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Use in Query Engine"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from llama_index.query_engine import RetrieverQueryEngine\n",
    "\n",
    "query_engine = RetrieverQueryEngine.from_args(\n",
    "    retriever,\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Pathway is an open framework for high-throughput and low-latency real-time data processing. It provides a high-level programming interface in Python for defining data transformations, aggregations, and other operations on data streams. With Pathway, you can effortlessly design and deploy sophisticated data workflows that efficiently handle high volumes of data in real time. It is interoperable with various data sources and sinks such as Kafka, CSV files, SQL/noSQL databases, and REST API's, allowing you to connect and process data from different storage systems. Pathway was designed to be a life-saver for Python developers and ML/AI engineers faced with live data sources, where quick reactions to fresh data are necessary. It can be used for a variety of purposes including streaming in Python, building AI data pipelines, and general data processing tasks. If you have any questions or need assistance with Pathway, you can check out the documentation on the official website or reach out to the team via email.\n"
     ]
    }
   ],
   "source": [
    "response = query_engine.query(\"Tell me about Pathway\")\n",
    "print(str(response))"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
